package cn.tiakon.dmp.report

import cn.tiakon.dmp.untils.{ContextUtils, Utils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}

object RepoADInArea {
  def main(args: Array[String]): Unit = {

    /*  val load = ConfigFactory.load()

      val sc: SparkContext = ContextUtils.getSparkContext()

      val sqlc: SQLContext = new SQLContext(sc)

      val dataFrame: DataFrame = sqlc.read.parquet(load.getString("output.parquet.path"))

      dataFrame.registerTempTable("logs")

      val selected: DataFrame = sqlc.sql("select provincename,cityname,requestmode,processnode from logs ")

      selected.show()*/


    val parquetPath: String = Utils.load.getString("output.parquet.path")

    val sc = ContextUtils.getSparkContext()
    val sqlc: SQLContext = new SQLContext(sc)

    //    原始请求数 有效请求 广告请求 参与竞价数 竞价成功数  展示量 点击量  广告成本 广告消费
    //    requestmode	processnode	iseffective	isbilling	isbid	iswin	adorderid

    val srcDataFrame: DataFrame = sqlc.read.parquet(parquetPath)

    //    srcDataFrame.show()

    val appInfo: RDD[((String, String), List[Double])] = srcDataFrame.map(rdd => {

      val provincename: String = rdd.getAs[String]("provincename")
      val cityname: String = rdd.getAs[String]("cityname")

      val requestmode: Int = rdd.getAs[Int]("requestmode")
      val processnode: Int = rdd.getAs[Int]("processnode")

      val iseffective: Int = rdd.getAs[Int]("iseffective")
      val isbilling: Int = rdd.getAs[Int]("isbilling")
      val isbid: Int = rdd.getAs[Int]("isbid")
      val iswin: Int = rdd.getAs[Int]("iswin")

      val adorderid: Int = rdd.getAs[Int]("adorderid")

      val winprice: Double = rdd.getAs[Double]("winprice")
      val adpayment: Double = rdd.getAs[Double]("adpayment")

      //      (原始请求数 有效请求 广告请求)
      val validRreq: List[Double] = if (requestmode == 1 && processnode >= 1)
        if (processnode >= 2) {
          if (processnode == 3) List(1, 1, 1)
          else List(1, 1, 0)
        } else List(1, 0, 0)
      else List(0, 0, 0)

      //      (参与竞价数)
      val bidding: List[Double] = if (iseffective == 1 && isbilling == 1 && isbid == 1 && adorderid != 0) List[Double](1)
      else List[Double](0)

      //      (竞价成功数,广告成本,广告消费)
      val BiddingCostsAndConsum: List[Double] = if (iseffective == 1 && isbilling == 1 && iswin == 1) List(1, 1, 1)
      else List(0, 0, 0)

      //      (展示量 点击量)
      val showAndClick: List[Double] = if (iseffective == 1 && requestmode == 2) {
        if (requestmode == 3) List(1, 1)
        else List(1, 0)
      } else List(0, 0)

      ((provincename, cityname), validRreq ++ bidding ++ BiddingCostsAndConsum ++ showAndClick)
    })


    //    val info: RDD[((String, String), ListBuffer[Double])] = appInfo

    val resultRDD: RDD[((String, String), List[Double])] = appInfo.reduceByKey((lb1, lb2) => {
      val tuplesLB: List[(Double, Double)] = lb1.zip(lb2)
      tuplesLB.map(t => t._1 + t._2)
    })

    Utils.deleteFileByCoreSite(sc,"D:\\dmp-testdata\\output\\report-ad-in-area")

    resultRDD.saveAsTextFile("D:\\dmp-testdata\\output\\report-ad-in-area")

    //    tuplesArrayInExecutor.toBuffer.foreach(println)
    //    appInfo.take(100).toBuffer.foreach(println)

  }
}
